package com.qf.kafka;

import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.internals.Topic;
import org.apache.kafka.common.serialization.StringDeserializer;

import java.time.Duration;
import java.util.*;

/**
 * @author Thor
 * @公众号 Java架构栈
 */
public class MyConsumerDemo1 {

    private static final String TOPIC_NAME = "my-cluster-topic";
    private static final String CONSUMER_GROUP_NAME = "my-group-1";


    public static void main(String[] args) {

        //1.设置参数
        Properties props = new Properties();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.10.137:9092,192.168.10.137:9093,192.168.10.137:9094");
        //设置消费者所属的消费组
        props.put(ConsumerConfig.GROUP_ID_CONFIG, CONSUMER_GROUP_NAME);
        //设置键值序列化器
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());

        // 自动提交offset的间隔时间
//        props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");
        // 设置是否自动提交
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");


        //一次poll最大拉取消息的条数，可以根据消费速度的快慢来设置
        props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 500);


        /*
        当消费主题的是一个新的消费组，或者指定offset的消费方式，offset不存在，那么应该如何消费
        latest(默认) ：只消费自己启动之后发送到主题的消息
        earliest：第一次从头开始消费，以后按照消费offset记录继续消费，这个需要区别于consumer.seekToBeginning(每次都从头开始消费)
        */
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");


        //2.创建消费者对象
        KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);

        //3.consumer订阅主题        subscribe 跟 assign 写一个就行了
        //consumer.subscribe(Arrays.asList(TOPIC_NAME));

        // 消费指定分区
        //consumer.assign(Arrays.asList(new TopicPartition(TOPIC_NAME, 0)));

        //消息回溯消费
        /*consumer.assign(Arrays.asList(new TopicPartition(TOPIC_NAME, 0),new TopicPartition(TOPIC_NAME, 1)));
        consumer.seekToBeginning(Arrays.asList(new TopicPartition(TOPIC_NAME, 0), new TopicPartition(TOPIC_NAME, 1)));*/

        //指定offset消费
        /*consumer.assign(Arrays.asList(new TopicPartition(TOPIC_NAME, 0)));
        consumer.seek(new TopicPartition(TOPIC_NAME, 0), 1);
        consumer.seek(new TopicPartition(TOPIC_NAME, 1), 1);*/


        //从指定时间点开始消费
        List<PartitionInfo> topicPartitions = consumer.partitionsFor(TOPIC_NAME);
        //从1小时前开始消费
        long fetchDataTime = new Date().getTime() - 1000 * 60 * 60 *24;
        Map<TopicPartition, Long> map = new HashMap<>();
        for (PartitionInfo par : topicPartitions) {
            map.put(new TopicPartition(TOPIC_NAME, par.partition()), fetchDataTime);
        }
        //根据时间来确定偏移量
        Map<TopicPartition, OffsetAndTimestamp> parMap = consumer.offsetsForTimes(map);
        List<TopicPartition> partitions = new ArrayList<>();
        List<Long> offsets = new ArrayList<>();
        for (Map.Entry<TopicPartition, OffsetAndTimestamp> entry : parMap.entrySet()) {
            TopicPartition key = entry.getKey();
            OffsetAndTimestamp value = entry.getValue();
            if (key == null || value == null) {
                continue;
            }
            Long offset = value.offset();
            System.out.println("partition->" + key.partition() + " | offset->" + offset);
            System.out.println();
            //根据消费里的timestamp确定offset
            if (value != null) {
                //consumer.assign(Arrays.asList(key));
                //consumer.seek(key, offset);
                partitions.add(key);
                offsets.add(offset);
            }
        }
        if (partitions.isEmpty()){
            System.out.println("该时间段内没有消息，请重新设置~");
            return;
        }
        consumer.assign(partitions);
        for (int i = 0; i < partitions.size(); i++) {
            consumer.seek(partitions.get(i),offsets.get(i));
        }


        //4.拉取kafka中该主题的消息
        pollFromConsumer(consumer);
/*        while (true) {
            //长轮询 拉消息。拉到的一批消息在records里面
            ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
            for (ConsumerRecord<String, String> record : records) {//遍历消息
                System.out.printf("收到消息：partition = %d,offset = %d, key = %s, value = %s%n", record.partition(),
                        record.offset(), record.key(), record.value());
            }
            if (records.count() > 0) {
                //手动提交中又分为同步提交和异步提交？
                //同步提交
//                consumer.commitSync();
                //异步提交
                consumer.commitAsync(new OffsetCommitCallback() {
                    @Override
                    public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception exception) {
                        if (exception != null) {
                            System.out.println("提交失败");
                        } else {
                            System.out.println("提交了offset -> "+ offsets);
                        }
                    }
                });
            }
        }*/
    }

    static void pollFromConsumer(Consumer consumer){
        //4.拉取kafka中该主题的消息
        while (true) {
            //长轮询 拉消息。拉到的一批消息在records里面
            ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
            for (ConsumerRecord<String, String> record : records) {//遍历消息
                System.out.printf("收到消息：partition = %d,offset = %d, key = %s, value = %s%n", record.partition(),
                        record.offset(), record.key(), record.value());
            }
            if (records.count() > 0) {
                //手动提交中又分为同步提交和异步提交？
                //同步提交
//                consumer.commitSync();
                //异步提交
                consumer.commitAsync(new OffsetCommitCallback() {
                    @Override
                    public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception exception) {
                        if (exception != null) {
                            System.out.println("提交失败");
                        } else {
                            System.out.println("提交了offset -> "+ offsets);
                        }
                    }
                });
            }
        }
    }

}
